Activiti 通过事务方式执行流程,可以根据你的需求定制。现在开始看一下 Activiti 通常是如何处理事务的。 如果触发了 Activiti 的操作(比如,开始流程,完成任务,触发流程继续执行), Activiti 会推进流程,直到每个分支都进入等待状态。更抽象的说,它会流程图执行深度优先搜索, 如果每个分支都遇到等待状态,就会返回。等待状态是"稍后"需要执行任务, 就是说 Activiti 会把当前状态保存到数据库中,然后等待下一次触发。 触发可能来自外部,比如用户任务或接收到一个消息,也可能来自 Activiti 本身,比如我们设置了定时器事件。 下面图片展示了这种操作:
我们可以看到包含用户任务,服务任务和定时器事件的流程。完成用户任务,和校验地址是在同一个工作单元中, 所以它们的成功和失败是原子性的。意味着如果服务任务抛出异常,我们要回滚当前事务, 这样流程会退回到用户任务,用户任务就依然在数据库里。 这就是 Activiti 默认的行为。在(1)中应用或客户端线程完成任务。这会执行服务,流程推进,直到遇到一个等待状态, 这里就是定时器(2)。然后它会返回给调用者(3),并提交事务(如果事务是由activiti开启的)。
有的时候,这不是我们想要的。有时我们需要自己控制流程中事务的边界,这样就能把业务逻辑包裹在一起。 这就需要使用异步执行了。参考下面的流程(判断):
这次我们完成了用户任务,生成一个发票,把发票发送给客户。 这次生成发票不在同一个工作单元内了,所以我们不想对用户任务进行回滚,如果生成发票出错了。 所以,我们想让 Activiti 实现的是完成用户任务(1),提交事务,返回给调用者应用。然后在后台的线程中,异步执行生成发票。 后台线程就是 Activiti 的 job 执行器(其实是一个线程池)周期对数据库的 job 进行扫描。 所以后面的场景,当我们到达"generate invoice"任务,我们为 activiti 创建一个稍后执行的job "消息", 并把它保存到数据库。job 会被 job 执行器获取并执行。我们也会给本地 job 执行器一个提醒,告诉它有一个新 job,来增加性能。
要想使用这个特性,我们要使用 activiti:async="true" 扩展。例子中,服务任务看起来就是这样:
<serviceTask id="service1" name="Generate Invoice" activiti:class="my.custom.Delegate" activiti:async="true" />
activiti:async 可以使用到如下 bpmn 任务类型中: task, serviceTask, scriptTask, businessRuleTask, sendTask, receiveTask, userTask, subProcess, callActivity
对于 userTask,receiveTask 和其他等待装填,异步执行的作用是让开始流程监听器运行在一个单独的线程/事务中。
Activiti 在其默认配置,重试 3 次工作,当在一个作业执行遇到任何异常情况。这对异步任务工作来说也成立。在某些情况下,就需要更多的灵活性。有两个参数进行配置:
这些参数可以通过配置 activiti:failedJobRetryTimeCycle。这里是一个简单的使用示例:
<serviceTask id="failingServiceTask" activiti:async="true" activiti:class="org.activiti.engine.test.jobexecutor.RetryFailingDelegate">
<extensionElements>
<activiti:failedJobRetryTimeCycle>R5/PT7M</activiti:failedJobRetryTimeCycle>
</extensionElements>
</serviceTask>
从 Activiti 5.9开始,JobExecutor 能保证同一个流程实例中的 job 不会并发执行。为啥呢?
参考如下流程定义:
我们有一个并行网关,后面有三个服务任务,它们都设置为异步执行。这样会添加三个 job 到数据库里。 一旦 job 进入数据库,它就可以被jobExecutor 执行了。JobExecutor 会获取 job,把它们代理到工作线程的线程池中,会在那里真正执行 job。 就是说,使用异步执行,你可以把任务分配给这个线程池(在集群环境,可能会使用多个线程池)。这通常是个好事情。 然而它也会产生问题:一致性。考虑一下服务任务后的汇聚。 当服务任务完成后,我们到达并发汇聚节点,需要决定是等待其他分支,还是继续向下执行。 就是说,对每个到达并行汇聚的分支,我们都需要判断是继续还是等待其他分支的一个或多个分支。
为什么这就是问题了呢?因为服务任务配置成使用异步执行,可能相关的job 都在同一时间被获取,被 JobExecutor 分配给不同的工作线程执行。 结果是三个单独的服务执行使用的事务在到达并发汇聚时可能重叠。如果出现了这个问题,这些事务是互相不可见的, 其他事务同时到达了相同的并发汇聚,假设它们都在等待其他分支。然而,每个事务都假设它们在等待其他分支, 所以没有分支会越过并发汇聚继续执行,流程实例会一直在等待状态,无法继续执行。
Activiti 是如何解决这个问题的? Activiti 使用了乐观锁。当我们基于判断的数据看起来不是最新的时 (因为其他事务可能在我们提交之前进行了修改,我们会在每个事务里增加数据库同一行的版本)。这时,第一个提交的事务会成功, 其他会因为乐观锁异常导致失败。这就解决了我们上面讨论的流程的问题:如果多个分支同步到达并行汇聚, 它们会假设它们都在登录,并增加它们父流程的版本号(流程实例)然后尝试提交。 第一个分支会成功提交,其他分支会因为乐观锁导致失败。因为流程是被 job 触发的, Activiti 会尝试在等待一段时间后尝试执行同一个 job,想这段时间可以同步网关的状态。
这是一个很好的解决方案吗?像我们看到的一样,乐观锁允许 Activiti 避免非一致性。它确定我们不会“堵在汇聚网关”, 意思是:或者所有分支都通过网关,或者数据库中的 job 正在尝试通过。然而,虽然这是一个对于持久性和一致性的完美解决方案, 但对于上层来说不一定是期望的行为:
在 Activiti 5.9 中,我们推荐了新的概念,并已经在 jBPM 4中实现了,叫做“排他job”。
对于一个流程实例,排他任务不能同时执行两个。考虑上面的流程: 如果我们把服务任务申请为排他任务,JobExecutor 会保证对应的 job 不会并发执行。 相反,它会保证无论什么时候获取一个流程实例的排他任务,都会把同一个流程实例的其他任务都取出来,放在同一个工作线程中执行。 它保证 job 是顺序执行的。
如何启用这个特性?从 Activiti 5.9 开始,排他任务已经是默认配置了。所以异步执行和定时器事件默认都是排他任务。 另外,如果你想把 job 设置为非排他,可以使用 activiti:exclusive="false" 进行配置。 比如,下面的服务任务就是异步但是非排他的。
<serviceTask id="service" activiti:expression="${myService.performBooking(hotel, dates)}" activiti:async="true" activiti:exclusive="false" />
这是一个好方案吗? 有一些人问我们这是否是一个好方案。他们的结论会帮你在并发和性能问题方面节省时间。 这个问题上需要考虑两件事情: